Debezium is a distributed platform for capturing changes in database tables and delivering them to downstream systems in real time. It is built on top of Apache Kafka and uses Kafka Connect to connect to databases and stream change data to Kafka topics. Debezium is a popular choice for implementing data pipelines and microservices architectures.
Here’s a summary of Debezium’s key features:
-
Real-time data capture: Debezium captures changes in database tables as they occur, enabling real-time data processing and analysis.
-
Change data capture (CDC): Debezium provides CDC functionality, allowing you to track the full history of changes in your database tables.
-
Kafka integration: Debezium uses Kafka Connect to stream change data to Kafka topics, making it easy to integrate with Kafka-based systems.
-
Database support: Debezium supports a wide range of databases: Db2 (Linux only), MongoDB, MySQL, Oracle Database (LogMiner), PostgreSQL, SQL Server (including Azure SQL DB).
-
Scalability: Debezium is horizontally scalable, allowing you to handle increasing data volumes by adding more Debezium servers.
See more at the official documentation page.
Learn about prerequisites at Debezium Supported Configurations
-
AMQ Streams Operator installed.
-
Install Kafka cluster in
my-kafka
namespace following the instructions in this paragraph Create Kafka Broker.
Let’s create a new project to include the Postgres DB and the Debezium instance:
oc new-project debezium
The following deployment uses the standard PostgreSQL 15 image available in the Red Hat Catalog.
A small tweak is required to enable the write-ahead log.
oc create configmap postgres-conf --from-file k8s/debezium/postgres.conf
oc apply -f k8s/debezium/01-postgres.yaml
The following commands will create a simple DB in Postgres:
set -x PGPOD (oc get pod -l app=postgres -ojsonpath='{.items[0].metadata.name}')
oc cp k8s/debezium/postgres-db-ddl.sql $PGPOD:/var/lib/pgsql/postgres-db-ddl.sql
oc exec -it $PGPOD -- psql -f /var/lib/pgsql/postgres-db-ddl.sql
Tip
|
If you are using Bash set the environment variable in this way: export PGPOD=$(oc get pod -l app=postgres -ojsonpath='{.items[0].metadata.name}')
|
At its core, Debezium is a Kafka connector deployed on a Kafka Connect distributed server.
The following commands have a double effect:
-
A Kafka Connect image containing the Debezium libraries
-
Run Kafka Connect against this image
oc create imagestream debezium-streams-connect
oc apply -f k8s/debezium/02-dbz-connect.yaml
To start the Debezium connector you have create the KafkaConnector
resource with the following command:
oc apply -f k8s/debezium/03-dbz-connector.yaml
Issue the following command to check that it’s running:
oc describe KafkaConnector dbz-pg-source-connector
You should see something similar:
Status: Conditions: Last Transition Time: 2023-11-25T14:45:44.792770738Z Status: True Type: Ready Connector Status: Connector: State: RUNNING worker_id: 10.128.1.190:8083 Name: dbz-pg-source-connector Tasks: Id: 0 State: RUNNING worker_id: 10.128.1.190:8083 Type: source Observed Generation: 1 Tasks Max: 1 Topics: outbox.public.shipping
If everything works as expected, the `outbox.public.shipping' topic should contain the information from the `shipping' table in the Postgres DB.
The following command dump the topic content:
oc exec -n my-kafka -it my-cluster-kafka-0 -- bin/kafka-console-consumer.sh \
--bootstrap-server my-cluster-kafka-bootstrap:9092 \
--topic outbox.public.shipping --from-beginning
In order to see the new messages flowing in real time, open a new terminal and insert a new record in the table:
set -x PGPOD (oc get pod -l app=postgres -ojsonpath='{.items[0].metadata.name}')
oc exec -it $PGPOD -- psql
On the psql
command prompt, issue:
\connect debezium;
INSERT INTO shipping VALUES (101, 'Premium','Pine Avenue');
exit
Debezium’s JDBC sink connector listens to Kafka topics containing database changes, translates them into SQL statements, and applies them to another relational database using a JDBC driver. This allows you to keep multiple databases in sync with your main database.
The following deployment uses the standard MySQL 8 image available in the Red Hat Catalog.
oc apply -f k8s/debezium/04-mysql.yaml
For sake of simplicity, the sink connector will be deployed in the same Kafka connect used by the source connector:
oc apply -f k8s/debezium/05-dbz-sink-connector.yaml
Open a shell in mysql container:
set -x MYSQLPOD (oc get pod -l app=mysql -ojsonpath='{.items[0].metadata.name}')
oc rsh $MYSQLPOD
Tip
|
If you are using Bash set the environment variable in this way: export MYSQLPOD=$(oc get pod -l app=postgres -ojsonpath='{.items[0].metadata.name}')
|
Launch the MySQL client and explore the table:
mysql -u$MYSQL_USER -p$MYSQL_PASSWORD
use mysqldbz;
select * from outbox_public_shipping;
The outbox pattern solves the problem of updating a table and sending a message to a message broker in an atomic fashion and it’s not possible to rely on the two-phase commit technology.
See more details in this article: Transactional Outbox.
Debezium is the perfect companion for implementing the Transaction Outbox pattern.
To see this in practice, update the payment table and the shipping with an atomic transaction.
Monitoring the kafka topic, you will notice that the message is sent only after the commit
command.
If you have closed the console consumer open it again in its own console:
oc exec -n my-kafka -it my-cluster-kafka-0 -- bin/kafka-console-consumer.sh \
--bootstrap-server my-cluster-kafka-bootstrap:9092 \
--topic outbox.public.shipping --from-beginning
Open the postgres client:
set -x PGPOD (oc get pod -l app=postgres -ojsonpath='{.items[0].metadata.name}')
oc exec -it $PGPOD -- psql
Issue the following SQL commands, but wait a few seconds after the inserts to ensure that Debezium does not publish until after the commit statement:
\connect debezium;
BEGIN;
INSERT INTO payments ( order_id, payment_method, transaction_id, payment_amount, payment_status)
VALUES (102, 'Credit Card', 'tx12345', 37.50, 'CONFIRMED');
INSERT INTO shipping
VALUES (102, 'Standard', 'Maple Street');
COMMIT;
exit